第6章 I/O复用:select和poll函数

6.1 概述

内核一旦发现进程指定的一个或多个I/O条件就绪(也就是说输入已准备好被读取,或者描述符已能承接更多的输出),它就通知进程,这个能力称为I/O复用。

I/O复用由select和poll两个函数支持,前者较新的称为pselect的POSIX变种。

I/O复用并非只限于网络编程,I/O复用典型使用在下列网络应用场合:

  • 当客户处理多个描述符(通常是交互式输入和网络套接字)时,必须使用I/O复用
  • 一个客户同时处理多个套接字是可能的,不过比较少见
  • 如果一个TCP服务器既要处理监听套接字,又要处理已连接套接字,一般就要使用I/O复用
  • 如果一个服务器既要处理TCP,又要处理UDP,一般就要使用I/O复用
  • 如果一个服务器要处理多个服务或者多个协议,一般就要使用I/O复用

6.2 I/O模型

Unix下可用的I/O模型有五种:

  • 阻塞式I/O
  • 非阻塞式I/O
  • I/O复用(select和poll)
  • 信号驱动式I/O(SIGIO)
  • 异步I/O(POSIX的aio_系列函数)

一个输入操作通常包括两个不同的阶段:

  1. 等待数据准备好
  2. 从内核向进程复制数据

对于一个套接字上的输入操作:

  1. 第一步通常涉及等待数据从网络中到达,当所等待的分组到达时,它被复制到内核中的某个缓冲区
  2. 把数据从内核缓冲区复制到应用进程缓冲区

6.2.1 阻塞式I/O阻塞

最流行的I/O模型是阻塞式I/O模型。默认情况下,所有套接字都是阻塞的。

image-20200220142525476

6.2.2 非阻塞式I/O模型

进程把一个套接字设置为非阻塞是在通知内核:当所请求的I/O操作非把本进程投入睡眠才能完成时,不要把本进程投入睡眠,而是返回一个错误。

image-20200220143123892

轮询(polling):应用进程持续轮询内核,以查看某个操作是否就绪,这样做往往耗费大量CPU时间。

6.2.3 I/O复用模型

通过调用select或poll,阻塞在这两个系统调用中的某一个之上,而不是阻塞在真正的I/O调用上

image-20200220211237813

I/O复用需要两个系统调用

优势:可以等待多个描述符就绪

6.2.4 信号驱动式I/O模型

使用信号,让内核在描述符就绪时发送SIGIO信号通知我们,这种模式为信号驱动式I/O。

image-20200220211730610

优势:等待数据报到达期间进程不被阻塞,主循环可以继续执行,只要等待来自信号处理函数的通知:既可以是数据已经准备好被处理,也可以是数据报已准备好被读取

6.2.5 异步I/O模型

异步I/O由POSIX规范定义。

异步函数的工作机制是:告知内核启动某个操作,并让内核在整个操作(包括将数据从内核复制到我们自己的缓冲区)完成后通知我们。

与信号驱动模型的主要区别在于:信号驱动式I/O是由内核通知我们何时可以启动一个I/O操作,而异步I/O模型是由内核通知我们I/O操作何时完成。

image-20200220222137004

POSIX异步函数以aio_或lio_开头。

这里调用aio_read函数,给内核传递描述符、缓冲区指针、缓冲区大小(与read相同的三个参数)和文件偏移(与lseek类似),并告诉内核当整个操作完成时如何通知我们。该系统调用立即返回,而且在等待I/O完成期间,进程不被阻塞。

6.2.6 各种I/O模型的比较

POSIX定义:

  • 同步I/O操作:导致请求进程阻塞,知道I/O操作完成

  • 异步I/O操作:不导致请求进程阻塞

image-20200220222618152

前4种模型都是同步I/O模型,因为其中真正的I/O操作(recvfrom)将进程阻塞。只有异步I/O模型与POSIX定义的异步I/O相匹配。

6.3 select函数

该函数允许进程指示内核等待多个事件中的任何一个发送,并只在有一个或多个事件发生或经历一段指定的时间后才唤醒它。

select函数告知内核对哪些描述符(就读、写或异常条件)感兴趣以及等待多长时间。描述符不局限于套接字,任何描述符都可以使用select来测试。

#include <sys/select.h>
#include <sys/time.h>

//若有就绪描述符则返回其数目,若超时则返回0,若出错则返回-1
int select(int maxfdp1, fd_set *readset, fd_set *writeset, 
           fd_set *exceptset, const struct timeval *timeout);

参数解析:

  1. timeout:告知内核等待所指定描述符中的任何一个就绪可花多长时间

    struct timeval{
        long     tv_sec;    //秒数
        long     tv_usec;   //微秒数
    }
    

    该参数有三种情况:

    • 永远等待下去:仅在有一个描述符准备好I/O时才返回,此时将该参数设置为空指针
    • 等待一段固定时间:在有一个描述符准备好I/O时返回,但是不超过由该参数所指向的timeval结构中指定的秒数和微秒数
    • 根本不等待:检测描述符后立即返回,这称为轮询。该参数指向timeval结构,其中定时器的值必须为0

    前两种情况的等待通常会被进程在等待期间捕获的信号中断,并从信号处理函数返回。

    timeval结构允许我们指定一个微秒级的分辨率,然而内核支持的真实分辨率往往粗糙得多,许多Unix内核把超时值向上舍入成10ms的倍数,另外内核还需要额外的调度延迟。

    timeout参数的const限定词表示它在函数返回时不会被select修改,无法通过该参数计算出实际等待时间。

  2. 中间三个参数readset、writeset和exceptset指定我们要让内核测试读、写和异常条件的描述符

    • 目前支持的异常条件只有两个:

      1. 某个套接字的带外数据的到达
      2. 某个已置为分组模式的伪终端存在可从其主端读取的控制状态信息
    • select使用描述符集来给3个参数中的每一个参数指定一个或多个描述符值,通常是一个整数数组,其中每个整数中的每一位对应一个描述符。具体实现与应用程序无关,隐藏在数据类型fd_set和以下四个宏中:

      void FD_ZERO(fd_set *fdset);		//clear all bits of fdset
      void FD_SET(int fd, fd_set *fdset);	//turn on the bit for fd in fdset
      void FD_CLR(int fd, fd_set *fdset);	//turn off the bit for fd in fdset
      void FD_ISSET(int fd, fd_set *fdset);//is the bit for fd on fdset ?
      

      我们分配一个fd_set类型的描述符集,并用这些宏设置和测试集合中的每一位,也可以使用赋值语句将它赋值成另一个描述符集:

      fd_set rset;
      
      FD_ZERO(&rset);	//初始化
      FD_SET(1, &rset);
      FD_SET(4, &rset);
      FD_SET(5, &rset);
      
    1. maxfdpl参数指定待测试的描述符的个数,它的值是待测试的最大描述符加1,描述符0,1,2……一直到maxfdpl-1均将被测试
    2. 头文件<sys/select.h>中定义的FD_SETSIZE常值是数据类型fd_set中描述符总数,通常是1024,通常使用不了那么多
    3. select函数修改由指针readset、writeset和exceptset所指向的描述符集,因此这三个参数都是值-结果传参。函数返回时,指示哪些描述符已就绪,描述符集内任何与未就绪描述符对应的位返回时均清零,因此每次重新调用select函数时,都需要重新为关心的位均置1。

6.3.1 描述符就绪条件

满足下列条件之一,则套接字准备好读:

  1. 该套接字的接收缓冲区中的数据字节数大于等于套接字接收缓冲区低水位标记SO_RCVLOWAT的当前大小,对该套接字读操作不阻塞并返回一个大于0的值
  2. 该连接的读半部关闭(也就是接收了FIN的TCP连接),读操作不阻塞并返回0
  3. 该套接字是一个监听套接字,且已完成的连接数不为0,不阻塞
  4. 其上有一个套接字错误待处理,不阻塞并返回-1,同时把设置errno为错误条件。此时可以使用getsockopt来读取和清除该错误

满足下列条件之一,则套接字准备好写:

  1. 该套接字的发送缓冲区中的可用空间字节数大于等于套接字发送缓冲区低水位标记的当前大小,并且该套接字已经连接或者不需要连接(如UDP套接字),对该套接字写操作不阻塞并返回一个大于0的值
  2. 该连接的写半部关闭,对这样的套接字的写操作将产生SIGPIPE信号
  3. 使用非阻塞式connect的套接字已建立连接,或者connect已经以失败告终
  4. 其上有一个套接字错误待处理,不阻塞并返回-1,同时把设置errno为错误条件

如果一个套接字存在带外数据或者仍处于带外标记,那么它有异常条件待处理。当某个套接字上发生错误时,它将由select标记为既可读又可写。

接收低水位标记和发送低水位标记的目的在于:允许应用进程控制在select返回可读或可写条件之前有多少数据可读或有多大空间可用于写。

举例:当数据少于64字节时,应用程序没有任何有效工作可做,则把接收低水位标记设置为64,以防少于64字节的数据准备好时select唤醒程序。

任何UDP套接字只要其发送低水位标记小于等于发送缓冲区大小(默认关系)就总是可写的,这是因为UDP套接字不需要连接。

image-20200805213132250

6.3.2 select的最大描述符数

最初设计select时,操作系统通常对每个进程可用的最大描述符数设置了上限,select就使用了相同的限制。

当今的Unix版本允许每个进程使用事实上无数目限制的描述符(往往仅受限于内存总量和管理性限制)。

表面上可以通过将FD_SETSIZE定义为某个更大的值,实际上却行不通,首先它是内核集成的,修改后需要重新编译内核,其次可能存在扩展性问题。

有些应用程序开始改用poll代替select,典型例子是需要复选大量描述符的事件驱动型服务器程序,所需描述符量超过1024个

6.4 str_cli函数(修订版)

原先版本可能阻塞于fgets调用,新版改为阻塞于select调用,或是等待标准输入可读,或是等待套接字可读。

image-20200805220011258

客户的套接字上的三个条件处理如下:

  • 如果对端TCP发送数据,那么该套接字变为可读,并且read返回一个大于0的值(读入数据的字节数)
  • 如果对端TCP发送一个FIN(对端进程终止),那么该套接字变为可读,并且read返回0(EOF)
  • 如果对端TCP发送一个RST(对端主机崩溃并重新启动),那么该套接字变为可读,并且read返回-1,而errno中含有确切的错误码
#include "unp.h"

void
str_cli(FILE *fp, int sockfd){
    int 	maxfdpl;
    fd_set	rset;
    char	sendline[MAXLINE], recvline[MAXLINE];
    
    FD_ZERO(&rset);
    for( ; ; ){
        FD_SET(fileno(fp), &rset);
        FD_SET(sockfd, &rset);
        //fileno函数把标准I/O文件指针转换为对应的描述符
        maxfdpl = max(fileno(fp), sockfd) + 1;
        Select(maxfdpl, &rset, NULL, NULL, NULL);
        
        if(FD_ISSET(sockfd, &rset)){		//socket is readable
            if(Readline(sockfd, recvline, MAXLINE) == 0)
                err_quit("str_cli: server terminated prematurely");
            Fputs(recvline, stdout);
        }
        
        if(FD_ISSET(fileno(fp), &rset)){	//input is readable
            if(Fgets(sendline, MAXLINE, fp) == NULL)
                return;
            Writen(sockfd, sendline, strlen(sendline));
        }
    }
}

6.5 批量输入

当客户端使用停-等方式工作时,虽然对交互式使用是合适的,但是却不能实现对通信管道的高效利用。

如果把客户与服务器之间的网络作为全双工管道考虑,请求从客户想服务器发送,应答从服务器向客户发送,则停-等方式如下图:

image-20200806100510176

在Unix的shell环境下,很容易实现重定向标准输入和标准输出,从而可以批量运行客户。当我们把标准输入和标准输出重定向到文件来运行新的客户程序时,却发现输出文件总是小于输入文件(对于回射服务器而言理应相等)。

image-20200806101504809

当运行一个客户程序时,标准输入的EOF同时意味着完成从套接字的读入。批量运行客户程序时,客户程序写完请求时,并不能立即关闭连接,因为管道中还有其它的请求和应答,但是修订版的str_cli函数对标准输入EOF的处理却是返回到main函数,而main函数随后终止。

我们需要的是一种关闭TCP连接其中一半的方法,即给服务器发送一个FIN告诉它我们已经完成了数据发送,但仍然保持套接字描述符打开以便读取。由shutdown函数完成。

为了提升性能而引入的缓冲机制增加了网络应用程序的复杂性:

  • fgets读取输入,将数据存放在stdio缓冲区,但是fgets只返回其中一行,其余仍在缓冲区。select处理完一行后再次被调用等待新的工作,并不知道stdio使用了缓冲区——它只是从read系统的角度指出是否有数据可读,而不是从fgets之类的调用角度

  • readline调用时,select不可见的数据隐藏在readline自己的缓冲区中

6.6 shutdown函数

终止网络连接的通常方法是调用close函数,不过close函数有两个限制,却可以使用shutdown来避免:

  • close把描述符引用计数减1,仅在计数变为0时才关闭套接字。shutdown可以不管引用计数就激发TCP的正常连接终止序列
  • close终止读和写两个方向的数据传送。既然TCP连接是全双工的,有时候我们需要告知对端已经完成数据发送,即使对端仍有数据要发送给我们。

image-20200806105214830

#include <sys/socket.h>
//成功返回0,出错返回-1
int shutdown(int sockfd, int howto);

函数行为依赖于howto参数的值:

  • SHUT_RD:关闭连接的读一半——套接字中不再有数据可接收,套接字接收缓冲区中的现有数据都被丢弃。进程不能再对套接字调用任何读函数。TCP套接字调用shutdown函数后,由该套接字接收的来自对端的任何数据都将被确认,然后悄然丢弃
  • SHUT_WR:关闭连接的写一半——对于TCP套接字,这称为半关闭。当前留在套接字发送缓冲区的数据将被发送掉,后跟TCP正常连接终止序列。不能再对套接字调用任何写函数
  • SHUT_RDWR:连接的读半部和写半部都关闭——等效于调用两次shutdown,第一次指定SHUT_RD,第二次指定SHUT_WR

这三个SHUT_XXX名字由POSIX规范定义,howto参数的典型值将会是0(关闭读半部)、1(关闭写半部)和2(读半部和写半部都关闭)。

6.7 str_cli函数(再修订版)

改进(且正确)版本:

  • 服务器一关闭它那一端的连接立马得到通知
  • 可以正确处理批量输入
  • 废弃以文本行为中心,改而针对缓冲区操作
#include "unp.h"

void
str_cli(FILE *fp, int sockfd){
    int 	maxfdpl, stdineof;
    fd_set	rset;
    char	buf[MAXLINE];
    int n;
    
    stdineof = 0;
    FD_ZERO(&rset);
    for( ; ; ){
        if(stdineof == 0)
            FD_SET(fileno(fp), &rset);
        FD_SET(sockfd, &rset);
        //fileno函数把标准I/O文件指针转换为对应的描述符
        maxfdpl = max(fileno(fp), sockfd) + 1;
        Select(maxfdpl, &rset, NULL, NULL, NULL);
        
        if(FD_ISSET(sockfd, &rset)){		//socket is readable
            if( (n = Read(sockfd, buf, MAXLINE)) == 0){
                if(stdineof == 1)
                    return;
                else
                    err_quit("str_cli: server terminated prematurely");
            }
            Write(fileno(stdout), buf, n)
        }
        
        if(FD_ISSET(fileno(fp), &rset)){	//input is readable
            if( (n = Read(fileno(fp), buf, MAXLINE)) == 0){
                stdineof = 1;
                Shutdown(sockfd, SHUT_WR);
                FD_CLR(fileno(fp), &rset);
                continue;
            }
            Writen(sockfd, buf, n);
        }
    }
}

6.8 TCP回射服务器程序(修订版)

使用select来处理任意个客户的单进程程序,而不是为每个客户派生一个子进程。

#include "unp.h"

int
main(int argc, char **argv){
    int		i, maxi, maxfd, listenfd, connfd, sockfd;
    //服务器所能处理的最大客户数目的限制是:
    //	min[FD_SETSIZE, 内核允许本进程打开的最大描述符数]
    //进程能打开的描述符数数目上已经无限制,只受资源和内存限制
    int		nready, client[FD_SETSIZE];
    ssize_t	n;
    fd_set	rset, allset;
    char	buf[MAXLINE];
    socklen_t	clilen;
    struct sockaddr_in cliaddr, servaddr;
    
    listenfd = Socket(AF_INET, SOCK_STREAM, 0);
    
    bzero(&servaddr, sizeof(servaddr));
    servaddr.sin_family = AF_INET;
    servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
    servaddr.sin_port = htons(SERV_PORT);
    
    Bind(listenfd, (SA*)&servaddr, sizeof(servaddr));
    
    Listen(listenfd, LISTENQ);
    
    //描述符集前三位分别被设置为:标准输入、标准输出和标准错误输出
    //select第一个参数为:maxfd+1
    maxfd = listenfd;
    maxi = -1;
    for(i = 0; i < FD_SETSIZE; i++)
        client[i] = -1;
    FD_ZERO(&allset);
    FD_SET(listenfd, &allset);
    
    for( ; ; ){
        rset = allset;
        nready = Select(maxfd+1, &rset, NULL, NULL, NULL);
        
        if(FD_ISSET(listenfd, &rset)){
            clilen = sizeof(cliaddr);
            connfd = Accept(listenfd, (SA*) &cliaddr, &clilen);
            
            for(i = 0; i < FD_SETSIZE; i++){
                if(client[i] < 0){
                    client[i] = connfd;
                    break;
                }
            }
            if(i == FD_SETSIZE)
                err_quit("too many clients");
            FD_SET(connfd, &allset);
            if(connfd > maxfd)
                maxfd = connfd;
            if(i > maxi)
                maxi = i;
            if(--nready <= 0)
                continue;
        }
        for(i = 0; i <= maxi; i++){
            if((sockfd = client[i]) < 0)
                continue;
            if(FD_ISSET(sockfd, &rset)){
                if((n = Read(sockfd, buf, MAXLINE)) == 0){
                    Close(sockfd);
                    FD_CLR(sockfd, &allset);
                    client[i] = -1;
                }else
                    Writen(sockfd, buf, n);
                if(--nready <= 0)
                    break;
            }
        }
    }
}

面向文本行服务器程序存在一个问题:如果一个恶意的客户连接到服务器,发送一个字节的数据(不是换行符)后进入睡眠,服务器将会调用readline从客户读入这个单字节的数据,然后阻塞于下一个read(readline内部的read)调用,等待客户其它数据。服务器因此阻塞而不能再为其它客户提供服务,直到那个恶意的客户发出一个换行符或终止为止。

当前版本的服务器程序已经弃用面向文本行的方法,等待换行输入或EOF而引起的拒绝服务攻击已经不复存在。

拒绝服务型攻击:当一个服务器在处理多个客户时,它绝对不能阻塞于只与单个客户相关的某个函数调用。否则可能导致服务器被挂起,拒绝为所有其它客户提供服务。

可能的解决办法:

  • 使用非阻塞式I/O
  • 让每个客户由单独的控制线程提供服务
  • 对I/O操作设置一个超时

6.9 pselect函数

pselect函数是由POSIX发明的,如今许多Unix变种支持它

#include <sys/select.h>
#include <signal.h>
#include <time.h>
//若有就绪描述符则返回其数目,超时返回0,出错返回-1
int pselect(int maxfdpl, fd_set *readset, 
            fd_set *writeset, fd_set *exceptset,
            const struct timespec *timeout, const sigset_t *sigmark);

pselect相对于通常的select有两个变化:

  • pselect使用timespec结构(POSIX的一个发明),不使用timeval结构

    struct timespec{
        time_t	tv_sec;
        //新结构第二个成员指定纳秒数,旧结构指定微秒数
        long	tv_nsec;
    };
    
  • pselect函数增加了第六个参数:一个指向信号掩码的指针。该参数允许程序先禁止递交某些信号,再测试由这些当前被禁止信号的信号处理函数设置的全局变量,然后调用pselect,然后告诉它重新设置信号掩码

6.10 poll函数

poll函数起源于SVR3,最初局限于流设备,SVR4取消了这种限制,允许poll工作在任何描述符上。

poll提供的功能与select类似,不过在处理流设备时,它能够提供额外的信息。

#include <poll.h>

struct pollfd{
    int		fd;
    short	events;	//指定要测试的条件
    short	revents;//返回描述符的状态
}
//若有就绪描述符返回其数目,超时返回0,出错返回-1
int poll(struct pollfd *fdarray, unsigned long nfds, int timeout);

image-20210122225638690

图分为三个部分:

  • 第一部分:处理输入的四个常值
  • 第二部分:处理输出的三个常值
  • 第三部分:处理错误的三个常值,不能在events中设置,但是当相应条件存在时就在revents中返回

poll识别三类数据:普通(normal)、优先级带(priority band)、高优先级(high priority)

就TCP和UDP套接字而言,以下条件引起poll返回特定的revent。不幸的是,POSIX在其poll的实现中留了许多空洞(即有多种方法可返回相同的条件):

  • 所有正规的TCP数据和所有UDP数据都被认为是普通数据
  • TCP的带外数据被认为是优先级带数据
  • 当TCP连接的读半部关闭时,也被认为是普通数据,随后读操作返回0
  • TCP连接存在错误既可以认为是普通数据,也可以是错误(POLLERR)。读操作后都将返回-1,并设置errno,可用于收到RST或发生超时等条件
  • 在监听套接字上有新的连接可用既可以认为是普通数据,也可以认为是优先级数据。大多数实现视为普通数据
  • 非阻塞connect的完成被认为是使相应套接字可写

参数nfds指定结构数组中元素的个数;timeout参数指定poll函数返回前等待多长时间,它是一个指定应等待毫秒数的正值。

image-20200806153002725

INFTIM常值被定义为一个负值,如果系统不能提供毫秒级精度的定时器,该值就向上舍入到最接近的支持值。

两种方法用来(也许只是暂时的)关闭对单个文件描述符的检查,而不需要重新建立整个fds列表:

  • 将events设为0
  • 如果不再关心某个特定描述符,那么可以把与它对应的pollfd结构的fd成员设置成一个负值,poll函数将忽略这样的pollfd结构的events成员,返回时将它的revents成员的值设置为0。

6.11 TCP回射服务器程序(再修订版)

#include "unp.h"
#include <limits.h>	//for OPEN_MAX

int
main(int argc, char **argv){
    int		i, maxi, listenfd, connfd, sockfd;
    int		nready;
    ssize_t	n;
    char	buf[MAXLINE];
    socklen_t	clilen;
    struct pollfd client[OPEN_MAX];
    struct sockaddr_in cliaddr, servaddr;
    
    listenfd = Socket(AF_INET, SOCK_STREAM, 0);
    
    bzero(&servaddr, sizeof(servaddr));
    servaddr.sin_family = AF_INET;
    servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
    servaddr.sin_port = htons(SERV_PORT);
    
    Bind(listenfd, (SA*)&servaddr, sizeof(servaddr));
    
    Listen(listenfd, LISTENQ);
    
    client[0].fd = listenfd;
    client[1].events = POLLRDNORM;
    for(i = 1; i < OPEN_MAX; i++)
        client[i].fd = -1;
    maxi = 0;	//含义client数组当前正在使用的最大下标值
    
    for( ; ; ){
        nready = Poll(client, maxi + 1, INFTIM);
        
        if(client[0].revents & POLLRDNORM){
            clilen = sizeof(cliaddr);
            connfd = Accept(listenfd, (SA*) &cliaddr, &clilen);
            
            for(i = 1; i < OPEN_MAX; i++){
                if(client[i].fd < 0){
                    client[i].fd = connfd;
                    break;
                }
            }
            if(i == OPEN_MAX)
                err_quit("too many clients");
            client[i].events = POLLRDNORM;
            if(i > maxfi)
                maxfi = i;
            if(i > maxi)
                maxi = i;
            if(--nready <= 0)
                continue;
        }
        for(i = 1; i <= maxi; i++){
            if((sockfd = client[i].fd) < 0)
                continue;
            if(client[i].revents & (POLLRDNORM | POLLERR)){
                if((n = Read(sockfd, buf, MAXLINE)) < 0){
                    if(errno == ECONNRESET){
                        Close(sockfd);
                        client[i].fd = -1;
                    }else
                        err_SYS("read error");
                }else if(n == 0){
                    Close(sockfd);
                    client[i].fd = -1;
                }else
                    Writen(sockfd, buf, n);
                if(--nready <= 0)
                    break;
            }
        }
    }
}